package codemperor.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;


/**
 * hdfs wordcount
 * 统计文件中的wordcount，然后结果输出到HDFS
 */
public class HDFSWordCount {

    public static void main(String[] args) throws Exception {
        /**********配置********/
        Configuration configuration = new Configuration();
        configuration = new Configuration();
        //这里可以设置副本系数
        configuration.set("dfs.replication", "1");
        //这里针对外网链接测试，设置datanode的hostname通信
        configuration.set("dfs.client.use.datanode.hostname", "true");
        URI uri = new URI("hdfs://master:9000");
        FileSystem fs = FileSystem.get(uri, configuration);

        /**********缓存********/
        Map<String, Integer> map = new HashMap<String, Integer>();

        /**********读文件********/
        Path input = new Path("/hdfsapi/wordcount");
        RemoteIterator<LocatedFileStatus> ri = fs.listFiles(input, true);
        while (ri.hasNext()) {
            LocatedFileStatus lf = ri.next();
            FSDataInputStream fsDataInputStream = fs.open(lf.getPath());

            BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));

            String line = "";
            while ((line = reader.readLine()) != null) {
                String[] row = line.split(" ");
                for (String v : row) {
                    System.out.println(row[0] + ":" + row[1]);
                    Integer word = map.get(v);
                    if (word == null) {
                        map.put(v, 1);
                    } else {
                        word++;
                        map.put(v, word);
                    }
                }
            }

            reader.close();
            fsDataInputStream.close();
        }

        // System.out.println(map);

        /**********输出结果********/
        Path output = new Path("/hdfsapi/output/wordcount_result");
        FSDataOutputStream out = fs.create(output);

        for (Object key : map.keySet()) {
            out.writeUTF(key.toString() + ":" + map.get(key) + "\n");
        }

        out.close();
        System.out.println("统计成功");

        /**********读取我们的结果文件看看********/
        FSDataInputStream fsDataInputStream = fs.open(new Path("/hdfsapi/output/wordcount_result"));
        IOUtils.copyBytes(fsDataInputStream, System.out, 1024);

        fs.close();
    }
}
